服务端代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19type TigerServer struct {}
func (t *TigerServer) HelloTiger(ctx context.Context,req *server_hello_proto.HelloRequest) ( *server_hello_proto.HelloResponse, error) {
resp := &server_hello_proto.HelloResponse{}
resp.Name = req.Name
resp.Age = req.Age + 11
return resp,nil
}
func main() {
grpcServer := grpc.NewServer()
server_hello_proto.RegisterTigerServiceServer(grpcServer,new(TigerServer))
listener, err := net.Listen("tcp", "127.0.0.1:1235")
if err != nil{
log.Fatal(err)
}
_ = grpcServer.Serve(listener)
}
从以上代码可以看出,开启一个gRPC服务的代码非常简洁
- 实例化一个grpc.Server
- 将实现了pb文件中service接口的服务对象注册进grpc.Server
- 监听端口
- 接收连接请求
实例化server
首先我们来看实例化一个server做了些什么操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
可以看出,这个函数与客户端Dial的处理流程很相似,都是接收一个可变参数用于覆盖默认配置,然后是初始化grpc.Server结构体,拦截器相关处理及一些grpc调用监控相关的初始化处理。
这里最关键的就是初始化grpc.Server结构体了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20type Server struct {
opts serverOptions //服务端可选配置项
mu sync.Mutex // guards following
lis map[net.Listener]bool //监听地址列表
conns map[transport.ServerTransport]bool //连接列表
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info //一个映射,通过service对象名找到这个service对象的信息
events trace.EventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData *channelzData
}
服务端可选配置项1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27type serverOptions struct {
creds credentials.TransportCredentials //加密证书相关(TLS)
codec baseCodec //序列化,如protobuf
cp Compressor //压缩 gzip
dc Decompressor //解压缩 gzip
unaryInt UnaryServerInterceptor //单次拦截器
streamInt StreamServerInterceptor //流式拦截器
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
statsHandler stats.Handler
//HTTP2 协议相关规范
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
//长连接保活相关,客户端会专门起一个goroutine去处理长连接事宜
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
}
将注册服务对象注册进server
将服务对象注册进server的函数是pb文件中用protoc代码生成插件protoc-gen-go工具自动生成的。
pb文件中服务端相关代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29// 服务描述
var _TigerService_serviceDesc = grpc.ServiceDesc{
ServiceName: "server_hello_proto.TigerService", //服务名
HandlerType: (*TigerServiceServer)(nil), //这里申明了该服务需要实现的接口
Methods: []grpc.MethodDesc{ //服务方法描述
{
MethodName: "HelloTiger", //rpc方法名
Handler: _TigerService_HelloTiger_Handler,//rpc请求的handler
},
{
MethodName: "FeedTiger",
Handler: _TigerService_FeedTiger_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Channel",
Handler: _TigerService_Channel_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "hello.proto",
}
func RegisterTigerServiceServer(s *grpc.Server, srv TigerServiceServer) {
//传入服务描述及具体的服务实现对象
s.RegisterService(&_TigerService_serviceDesc, srv)
}
以下代码可以看出,实际上还是调用grpc包中的注册方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// RegisterService registers a service and its implementation to the gRPC
// server. It is called from the IDL generated code. This must be called before
// invoking Serve.
// 这个方法由IDL(也就是protobuf)通过插件生成的代码调用。这个调用必须要在Server开始接受请求之前调用
//这段代码主要是判断具体服务是否实现了服务接口
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
//如果server已经运行了,不允许注册
if s.serve {
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
//不允许注册相同名字的服务
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
可以看出,register主要是将服务名和具体服务信息添加进Server的属性映射m,也就是服务名与具体服务信息的映射,相当于是http web中的路由表。
server开始监听并接收请求
1 | // Serve accepts incoming connections on the listener lis, creating a new |
从代码可以看出,每个请求进来,Server会起一个goroutine去处理这个请求,真正处理客户端请求的是 handleRawConn 方法。
1 | // handleRawConn forks a goroutine to handle a just-accepted connection that |
1 | // 每个http2连接在服务端会生成一个ServerTransport,这里是 htt2server |
1 | // NewServerTransport creates a ServerTransport with conn or non-nil error |
1 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is |
一个HTTP2请求进来,先HTT2协议握手建立HTT2连接(处理建立连接过程中的帧数据),然后一直循环处理新的流的建立(新的HTTP2请求到来)和数据帧的收发(基于HTTP2的多路复用,客户端可以使用同一条链接同时发送多个请求)。HTTP2链接在代码层面为ServerTransport,也就是http2Server
接下来我们看下serveStreams方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
//回调中开启子goroutine,处理rpc请求
go func() {
defer wg.Done()
//grpc基于http2,一个rpc请求使用一个stream,一个连接可能有多个rpc请求,也就存在多个请求stream过来
//当有新的rpc请求进来,就会进入此回调,然后调用server的handleStream方法处理rpc请求
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
可以看出,这里是使用waitGroup进行阻塞等待,不断处理同一个http2连接的请求。
HandleStreams使用注册的Handler处理请求streams1
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
这是一个接口方法,这里的具体实现是http2Server的HandleStreams方法,传入一个处理stream的handler处理函数,一个trace处理函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
rst: true,
rstCode: se.Code,
onWrite: func() {},
})
}
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
可以看出该方法循环读取客户端连接发送过来的帧,如果是HEADER帧,说明有新的rpc请求进来,回调handler;如果是DATA帧,将数据分发到stream;如果是RST帧…
接着我们来看下server的handleStream方法,该方法处理新的rpc请求1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
//下面的这段代码主要是分割字符串取出service服务名与相应的方法名
sm := stream.Method()
//去除开头的'/'
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]
//在server的service映射中根据service名查找对应的service服务
srv, knownService := s.m[service]
if knownService {//如果有找到
//在普通的rpc方法映射中查找对应的方法描述
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
//在流式rpc方法映射中查找对应的流描述
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
//如果请求的service服务或者方法不存在,且server的配置中有配置处理位置服务的方法,则交由这个方法处理
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}
接下来我们看下普通一元rpc请求的processUnaryRPC(去除掉一些与主要流程没有太大关系的非核心代码,流式rpc处理方法就不贴出来的,思路是差不多的)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
// ...
//设置压缩解压缩的配置
// comp and cp are used for compression. decomp and dc are used for
// decompression. If comp and decomp are both set, they are the same;
// however they are kept separate to ensure that at most one of the
// compressor/decompressor variable pairs are set for use later.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
decomp = encoding.GetCompressor(rc)
if decomp == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
t.WriteStatus(stream, st)
return st.Err()
}
}
// If cp is set, use it. Otherwise, attempt to compress the response using
// the incoming message compression method.
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
}
}
var payInfo *payloadInfo
if sh != nil || binlog != nil {
payInfo = &payloadInfo{}
}
//接收数据并解压缩
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
}
}
return err
}
if channelz.IsOn() {
t.IncrMsgRecv()
}
// 反序列化,最终数据放到v中,而这个v则指向服务接口实现对应方法的请求参数req
df := func(v interface{}) error {
//反序列化请求参数
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength,
Data: d,
Length: len(d),
})
}
//...
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
//调用Handler,这个Handler方法是proto自动编码工具生成的。其内部会去调用service服务的对应方法。df是反序列化方法,最后一个是创建grpc.Server指定的拦截器
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
// Convert appErr if it is not a grpc status error.
appErr = status.Error(codes.Unknown, appErr.Error())
appStatus, _ = status.FromError(appErr)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
trInfo.tr.SetError()
}
// 写入错误信息到stream中
if e := t.WriteStatus(stream, appStatus); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
//...
}
return appErr
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
}
opts := &transport.Options{Last: true}
// 序列化reply给客户端
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}
if sts, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, sts); e != nil {
channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
}
} else {
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
}
if binlog != nil {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
return err
}
err = t.WriteStatus(stream, statusOK)
return err
}
上面这段代码的主要逻辑就是从stream中读取req请求,反序列化后调用methodDesc中的handler方法,处理完请求后将响应序列化然后写入stream返回给客户端。
我们看一下消息解析的方法,这个方法是从buf中解析原始的数据转为protobuf序列化后的数据格式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
// recvMsg解析出真正的消息(头5个字节:第一个字节代表是否压缩,2-5个字节消息体的长度,后面的数据全部读取给req)
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
payInfo.wireLength = len(d)
}
// 检查压缩类型是否正确
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return nil, st.Err()
}
var size int
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
size = len(d)
} else {
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
} else {
size = len(d)
}
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
return d, nil
}
我们再来看下上面提到的proto代码工具自动生成的handler方法(以HelloTiger为例)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22func _TigerService_HelloTiger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
//dec是传入的反序列化方法
if err := dec(in); err != nil {
return nil, err
}
//如果拦截器没设置,那么直接调用相应的方法
if interceptor == nil {
return srv.(TigerServiceServer).HelloTiger(ctx, in)
}
//servcie服务信息
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/server_hello_proto.TigerService/HelloTiger",
}
//回调
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TigerServiceServer).HelloTiger(ctx, req.(*HelloRequest))
}
//有拦截器,先调用拦截器方法,传入回调函数
return interceptor(ctx, in, info, handler)
}
我们再来看下中的NewContextWithServerTransportStream函数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25//该函数基于现有ctx生成新的context,并将stream保存到上面
// NewContextWithServerTransportStream creates a new context from ctx and
// attaches stream to it.
//
// This API is EXPERIMENTAL.
func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
return context.WithValue(ctx, streamKey{}, stream)
}
//该接口用于服务端设置传递给客户端的header
// ServerTransportStream is a minimal interface that a transport stream must
// implement. This can be used to mock an actual transport stream for tests of
// handler code that use, for example, grpc.SetHeader (which requires some
// stream to be in context).
//
// See also NewContextWithServerTransportStream.
//
// This API is EXPERIMENTAL.
type ServerTransportStream interface {
Method() string
SetHeader(md metadata.MD) error
SendHeader(md metadata.MD) error
SetTrailer(md metadata.MD) error
}
我们从代码中可以看出,调用NewContextWithServerTransportStream函数时传入的是当前请求的stream.context()
我们再来看下 stream.ctx的生成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
state := &decodeState{
serverSide: true,
}
//解析HEADER帧,获取HEADER帧中的各个字段
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: statusCodeConvTab[se.Code()],
onWrite: func() {},
})
}
return false
}
buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
if state.data.timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(state.data.mdata) > 0 {
// 这里会将state.mdata保存到新的context中
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
if state.data.statsTags != nil {
s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
}
if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
s.cancel()
return true
}
t.maxStreamID = streamID
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: metadata.MD(state.data.mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
// Register the stream with loopy.
t.controlBuf.put(®isterStream{
streamID: s.id,
wq: s.wq,
})
handle(s)
return false
}
当收到一个Header帧,就表明有新的rpc请求到来,这时候就会解析header帧并创建stream,在创建stream的时候,会把用户自定义的header字段保存到stream.context中
接下来看一下返回给客户端响应的处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
//反序列化响应消息
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
return err
}
//压缩
compData, err := compress(data, cp, comp)
if err != nil {
channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
return err
}
//创建消息体头部
hdr, payload := msgHeader(data, compData)
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > s.opts.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
}
//写入stream
err = t.Write(stream, hdr, payload, opts)
if err == nil && s.opts.statsHandler != nil {
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}
return err
}
1 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error |